package com.hanxiaozhang.tx.listener;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.hanxiaozhang.tx.entity.OrderEntity;
import com.hanxiaozhang.tx.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;


/**
 * 〈一句话功能简述〉<br>
 * 〈生产者执行本地事务监听〉
 *
 * @author hanxinghua
 * @create 2022/10/9
 * @since 1.0.0
 */
@Slf4j
@Service
@RocketMQTransactionListener  // 表明这个一个生产端的消息监听器，需要配置监听的事务消息生产者组
public class ProducerLocalTransactionListener implements RocketMQLocalTransactionListener {

    @Autowired
    private OrderService orderService;

    /**
     * 执行本地事务
     *
     * @param msg
     * @param arg
     * @return
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {

        RocketMQLocalTransactionState state = RocketMQLocalTransactionState.UNKNOWN;
        try {
            // 或者从消息body中获取orderId：OrderEntity order= (OrderEntity)msg.getPayload()
            String orderId = (String) msg.getHeaders().get("orderId");
            // 更新订单信息
            orderService.updatePayStatusByOrderId(orderId);
            state = RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("执行本地事务失败，失败原因:{}", e);
            e.printStackTrace();
            state = RocketMQLocalTransactionState.ROLLBACK;
        }
        return state;
    }

    /**
     * MQ回调检查本地事务执行情况
     * <p>
     *
     * @param msg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {

        RocketMQLocalTransactionState state = RocketMQLocalTransactionState.UNKNOWN;
        try {
            String orderId = (String) msg.getHeaders().get("orderId");
            boolean isPaySuccess = orderService.checkOrderPaySuccess(orderId);
            if (isPaySuccess) {
                state = RocketMQLocalTransactionState.COMMIT;
            } else {
                state = RocketMQLocalTransactionState.ROLLBACK;
            }
        } catch (Exception e) {
            log.error("MQ回调检查本地事务执行情况失败，失败原因:{}", e);
            e.printStackTrace();
        }
        return state;
    }
}
